Elixirのノード接続とメッセージ通信
Elixirでプログラミングをするときは頻繁にプロセス間でメッセージ通信を行います。
Elixirのプロセスはネットワークに透過的なため、別ノード(別VM)へのプロセスへメッセージを送信することができます。
また、別ホスト(別サーバ)で動いているプロセスへも送信できます。
今回は異なるVM間、異なるホスト間のメッセージ通信を試してみます。
ノード接続
メッセージ通信を行うノード同士で接続します。
まず最初に名前付きでノードを起動しましょう。
(@以降の内容はホストネームです。hostname -s
で確認できます)
$ iex --sname foo iex(foo@kanmo)>
次に別のターミナルウィンドウを開き、同じく名前付きのノードを起動します。
$ iex --sname bar iex(bar@kanmo)>
それでは、ノード同士で接続します。
iex(foo@kanmo)> Node.connect(:bar@kanmo) true
接続できました、確認してみましょう。
iex(foo@kanmo)> Node.list [:bar@kanmo]
別のノードでも確認してみます
iex(bar@kanmo)> Node.list [:foo@kanmo]
両方のノードで接続を確認できました。
ノードの自動接続
ここで面白い機能を一つ、ノードの自動接続をご紹介します。
まずクラスタに新しくノード追加します。
$ iex --sname baz iex(baz@kanmo)> Node.connect(:foo)
新しくノードbazをノードfooに接続しました。 追加したノードbazの接続リストを確認してみましょう。
iex(baz@kanmo)> Node.list [:foo@kanmo, :bar@kanmo]
接続していないはずの:bar@kanmo
とも接続しています。Elixirのノードはクラスタ内のノードに接続すると他のすべてのノードにも自動的に接続します。
メッセージ通信
それではノードから別のノードへメッセージを送信します。事前にモジュールを作成しておきます。
ping_pong.ex
# サーバプロセスのモジュール defmodule PingPong do def server_start do pid = spawn(__MODULE__, :loop, []) :global.register_name(:server, pid) end def loop do # 2. クライアントプロセスからのメッセージを受信する receive do {sender, msg} -> IO.puts "received message: #{msg}" # 3. クライアントプロセスへメッセージを送信する send sender, {:ok, "#{msg} Pong!!!!!"} end loop end end # クライアントプロセスのモジュール defmodule Client do def start do # 1. サーバプロセスへメッセージを送信する send :global.whereis_name(:server), {self, "Ping"} # 4. サーバプロセスからメッセージを受信する receive do {:ok, message} -> IO.puts message end end end
それぞれVMを立ち上げてコードをコンパイルします。
$ iex --sname foo iex(foo@kanmo)> c("ping_pong.ex")
$ iex --sname bar iex(bar@kanmo)> c("ping_pong.ex")
別ノードに接続をしてから、サーバプロセスを立ち上げます。
iex(foo@kanmo)> Node.connect(:"bar@kanmo") iex(foo@kanmo)> PingPong.server_start :yes
それではクライアントプロセスを立ち上げてプログラムを試してみましょう。別ノードで起動します。
iex(bar@kanmo)> Client.start Ping Pong!!!!! :ok
サーバプロセスからメッセージを受け取って出力できています。 サーバプロセスを起動しているノードでも、クライアントプロセスからメッセージを受信しているのが分かります。
iex(hoge@kanmo)> PingPong.server_start :yes received message: Ping
ノードから別ノードへメッセージを送信することができました!
別ホストのノードと接続、メッセージ送信。
次に別ホスト(別サーバ)のノードで動いているプロセスとメッセージ通信を行いましょう。
今回はEC2で試します。
セキュリティグループの設定
最初にノード間が通信するためのポートをセキュリティグループに設定します。
ポートは4369
番と9100-9155
番をInboundに設定します。
(このサンプルは一時的に使用するのみなのでSource IPに0.0.0.0/0
に設定していますが、
実際に使用する場合は適切なセキュリティグループを設定してください。)
ノードの起動
それではノードを起動しましょう。引数にノードのメッセージ通信用のポート番号を指定します。
また、--cookie
オプションも指定します。通信するノードはここで指定するcookieの値を同じ値にする必要があります。
(別のノードからのアクセスに対する基本的なセキュリティ機能です)。
$ iex --name [email protected] --cookie pingpong --erl "-kernel inet_dist_listen_min 9100 inet_dist_listen_max 9155" iex([email protected])1> c("ping_pong.ex")
別のノードでも起動します。
$ iex --name [email protected] --cookie pingpong --erl "-kernel inet_dist_listen_min 9100 inet_dist_listen_max 9155" iex([email protected])1> c("ping_pong.ex")
ノード接続、実行
先ほどと同様にノード間で接続をしてメッセージ通信を行います。
サーバ
iex([email protected])> Node.connect(:"[email protected]") iex([email protected])> PingPong.server_start :yes
クライアントプロセス
iex([email protected])> Client.start Ping Pong!!!!! :ok
うまくいきました!別ホストのプロセスともメッセージ通信に成功しています。
Task.async & Task.await
ノード間のメッセージ通信とは直接関係ないですが、Elixirの非同期処理の手法を一つご紹介します。
Elixirでは複数プロセスを立ち上げバックグラウンドでプロセスが処理を行い、結果を待ち受ける非同期処理を簡単に書けます。
以下は階乗計算をバックグラウンドプロセスで実行する例です。
defmodule Fact do def of(0), do: 1 def of(n), do: n * Fact.of(n-1) end Task.async(fn -> Fact.of(10) end) |> &Task.await(&1) |> IO.puts
複数ホスト、複数ノードでの非同期処理
それでは、ここまでで試した機能を使ってもう少し大きなコードを書いてみます。
2台のEC2でそれぞれノードを立ち上げ非同期処理を行います。
コードの内容は何でも良いのですが、このサンプルでは手元にある複数企業の株価コードを元に株価とその関連情報を取得します。
プロジェクトの作成
mix new stocker
クラスタの設定
クラスタを構築するノードを設定します(config/config.ex)。
use Mix.Config config :stocker, master_node: :'[email protected]' config :stocker, slave_nodes: [:'[email protected]', :'[email protected]']
ノード接続
実行時にそれぞれのノードを接続します。以下のように接続します。
def main do Application.get_env(:stocker, :master_node) |> Node.start Application.get_env(:stocker, :slave_nodes) |> Enum.each(&Node.connect(&1)) (略) end
非同期処理
取得する企業の情報は1000件以上あるので逐次処理でやっては時間がかかり過ぎてしまいます。
Task.async & Task.awaitでプロセスを生成し、非同期で処理をします。
def fetch_stocks(nodes, code_per_node) (略) Enum.zip(nodes, code_per_node) |> Enum.flat_map(fn {node, codes} -> Enum.map(codes, fn code -> Task.Supervisor.async({Stocker.TasksSupervisor, node}, Stocker.Worker, :start, [String.to_integer(code)]) end) end) |> Enum.map(&Task.await(&1, :infinity)) |> print_results end
実行
あとはビジネスロジックを記述して実行します。
今回はビジネスロジックは関係ないので省略します。やっていることはスクレイピングで各企業の株価情報を取得して最後に標準出力するだけです。
それぞれのノードでプロセスが起動し処理が行われれば成功です。
まとめ
異なるVM間でメッセージ通信を行うことは普通のプログラミング言語ではなかなか大変です、それが別ホストのVMとなるとさらに実装が大変になります。
Elixirでは通信を行うための複雑な部分はVMで行ってくれるため、他の言語よりも簡単にVM間でのメッセージ通信を実装できます。
興味ある方は是非、メッセージ通信を異なるVM間、ホスト間で試してみてください。